package com.lyon.demo.netty.client;

import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.Pair;
import cn.hutool.core.thread.NamedThreadFactory;
import cn.hutool.core.util.SerializeUtil;
import com.lyon.demo.common.spi.DefaultSpiLoader;
import com.lyon.demo.common.spi.annotation.LyonSpi;
import com.lyon.demo.common.spi.annotation.Signleton;
import com.lyon.demo.rpc.api.core.*;
import com.lyon.demo.rpc.api.endpoint.Transport;
import com.lyon.demo.rpc.api.naming.NamingClient;
import com.lyon.demo.rpc.api.naming.NamingService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.*;

/**
 * @author Lyon
 */
@LyonSpi(value = CommonProtocol.NETTY)
@Signleton
@Slf4j
public class NettyNamingService implements NamingService, Closeable {

    private String remoteProtocol;
    private Transport transport;
    private NamingClient namingClient;
    private SocketAddress namingAddress;

    @SneakyThrows
    @Override
    public void registerService(ServiceInstance serviceInstance) {
        this.transport = namingClient.createTransport(namingAddress, 3000);
        final Command command = new Command(Types.NamingTypes.register, Versions.V_1, SerializeUtil.serialize(serviceInstance));
        final CompletableFuture<Command> future = transport.send(command);
        final Command response = future.get(60, TimeUnit.SECONDS);
        Result result = SerializeUtil.deserialize(response.getPayload());
        Result.checkSuccess(result);
    }

    @SneakyThrows
    @Override
    public ServiceInstances lookupService(String serviceName) {
        final byte[] payload = serviceName.getBytes(StandardCharsets.UTF_8);
        final Command command = new Command(Types.NamingTypes.lookup_service_name, Versions.V_1, payload);
        return getServiceInstances(command);
    }

    @SneakyThrows
    @Override
    public <T> ServiceInstances lookupService(Class<T> interfaceType) {
        final byte[] payload = SerializeUtil.serialize(interfaceType.getName());
        final Command command = new Command(Types.NamingTypes.lookup_interface, Versions.V_1, payload);
        return getServiceInstances(command);
    }

    @SneakyThrows
    @Override
    public Closeable start(SocketAddress socketAddress, Properties prop) {
        this.namingAddress = socketAddress;
        this.remoteProtocol = prop.getProperty("naming-remote-protocol");
        this.namingClient = DefaultSpiLoader.loader(NamingClient.class, remoteProtocol);
        this.transport = namingClient.createTransport(socketAddress, 3000);
        heartbeat();
        return this;
    }

    @SneakyThrows
    @Override
    public void registerInterface(String interfaceName, String serviceName) {
        final Pair<String, String> pair = new Pair<>(interfaceName, serviceName);
        final byte[] payload = SerializeUtil.serialize(pair);
        final Command command = new Command(Types.NamingTypes.register_interface, Versions.V_1, payload);
        final CompletableFuture<Command> future = transport.send(command);
        final Command response = future.get(60, TimeUnit.SECONDS);
        final Result result = SerializeUtil.deserialize(response.getPayload());
        Result.checkSuccess(result);
    }

    private void heartbeat() {
        final NamedThreadFactory namedThreadFactory = new NamedThreadFactory("naming-client-heartBeat-", true);
        final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(namedThreadFactory);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            // TODO: 2022/4/16 维持心跳
            final CompletableFuture<Command> future = transport.send(new Command(Types.NamingTypes.beat, Versions.V_1, null));
            Command response = null;
            try {
                response = future.get(5, TimeUnit.SECONDS);
            } catch (Exception ex) {
                log.error("注册中心-心跳包接收异常..",ex);
            }
            assert response != null;
            Result result = SerializeUtil.deserialize(response.getPayload());
            try {
                Result.checkSuccess(result);
            } catch (Exception e) {
                log.error("注册中心-心跳包接收失败..{} -{}", result, e.getMessage());
            }
        }, 1, 3, TimeUnit.SECONDS);
    }

    @Override
    public void close() throws IOException {

    }


    @SneakyThrows
    private ServiceInstances getServiceInstances(Command command) {
        final CompletableFuture<Command> future = transport.send(command);
        final Command response = future.get(50, TimeUnit.SECONDS);
        final Result<ServiceInstances> result = SerializeUtil.deserialize(response.getPayload());
        Result.checkSuccess(result);
        ServiceInstances serviceInstances = result.getData();
        Assert.notNull(serviceInstances);
        return serviceInstances;
    }

}
